package com;


import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@SuppressWarnings("all")
public class ORCArrayWriter6112 {

    //    static LinkedBlockingQueue<FlyParam> queue = new LinkedBlockingQueue<>(100);
    static SynchronousQueue<FlyParam> queue = new SynchronousQueue<FlyParam>(true);
    static boolean parsecomplete = false;

    public static void main(String[] args) throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                List<Float> list = null;
                long start = System.currentTimeMillis();
                for (int k = 0; k < 120000; k++) {
                    list = new ArrayList<Float>(15000);
                    for (int j = 0; j < 15000; j++) {
                        list.add(j+0.123445f);
                    }
                    boolean b_in = false;
                    while (!b_in) {
                        try {
                            queue.put(new FlyParam("a", (start + k) + "", list));
                            b_in = true;
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (k % 100 == 0) {
                        long end = System.currentTimeMillis();
                        System.out.println("-----------------------------k=>" + k + ", cost=>" + (end - start) + "------------------------");
                    }
                }
                // 文件关闭通知
                parsecomplete = true;
            }
        }).start();

        Thread task = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    JobConf conf = new JobConf();
                    StructObjectInspector inspector =
                            (StructObjectInspector) ObjectInspectorFactory
                                    .getReflectionObjectInspector(FlyParam.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
                    OrcSerde serde = new OrcSerde();
                    OutputFormat outFormat = new OrcOutputFormat();
                    FileSystem fs = FileSystem.get(conf);
                    conf.setCompressMapOutput(true);
                    ((OrcOutputFormat) outFormat).setCompressOutput(conf,true);
                    RecordWriter writer = outFormat.getRecordWriter(fs, conf,
                            "data/fly-" + Thread.currentThread().getId() + ".orc", Reporter.NULL);
                    int i = 0;
                    long s1 = System.currentTimeMillis();
                    while (true) {
                        FlyParam param = queue.take();
                        if (param == null) {
                            System.out.println("数据 null");
                            continue;
                        }
                        writer.write(NullWritable.get(), serde.serialize(param, inspector));
                        if (i++ % 1000 == 0) {
                            long e1 = System.currentTimeMillis();
                            System.out.println(Thread.currentThread() + "all thread total write complete:" + (i * 5) + ", cost=>" + (e1 - s1) + ",avg=>" + ((e1 - s1) / (i * 5)) + "毫秒/条");

                                                  writer.close(Reporter.NULL);
                            fs.close();
                            fs = FileSystem.get(conf);
                            writer = outFormat.getRecordWriter(fs, conf,
                                    "data/fly-" + Thread.currentThread().getId() +"-"+i+ ".orc", Reporter.NULL);
                            System.out.println("write success .");
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        ExecutorService service = new ThreadPoolExecutor(12, 20, 10000000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().offer(r, 10, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        for (int i = 0; i < 5; i++) {
            service.submit(task);
        }

    }

    /**
     * CREATE EXTERNAL TABLE fly(
     * channel string,
     * time string,
     * params array<string>
     * )STORED AS ORC
     * location '/tmp/orc/';
     */
    static class FlyParam implements Writable {
        String channel;
        String time;
        List<Float> params;


        public List<Float> getParams() {
            return params;
        }


        FlyParam(String channel, String time, List<Float> values) {
            this.channel = channel;
            this.time = time;
            this.params = values;
        }

        @Override
        public void readFields(DataInput arg0) throws IOException {
            throw new UnsupportedOperationException("no write");
        }

        @Override
        public void write(DataOutput arg0) throws IOException {
            throw new UnsupportedOperationException("no read");
        }

    }
}